Elixirのプロセス生成とプロセス間通信
Elixirは並行処理能力に優れたRubyライクなシンタックスを持つ関数型言語です。
前回のエントリではElixirの概要やシンタックスの紹介を簡単にしました。
今回はElixirの複数プロセスを使ったプログラミングについて書いていきます。
Elixirにはプロセス周りの実装をサポートしてくれるOTPというライブラリがあるのですが、まずはOTPを使用しないプログラミングを扱います。
軽量プロセス
Elixirのプロセスは軽量プロセスと言われていて、OSのプロセスやスレッドではなくGreenスレッドです。
カーネルではなくVMでスケジューリングされるので軽量、コンテキストスイッチが発生しないという利点があります。
1軽量プロセスで約300ワードです。
プロセス生成
Elixirで新しいプロセスを生成するのは簡単です。
spawn関数を呼び出し、戻り値に生成したプロセスIDを受け取ります。
defmodule Hoge do def say(message) do IO.puts "Hello #{message}!" end end child_pid = spawn(Hoge, :say, ["World"])
spawn関数の引数は以下の通りです
引数 | 意味 |
---|---|
第一引数 | モジュール名 |
第二引数 | 生成したプロセスが実行する関数 |
第三引数 | 関数に渡す引数(この例の場合はsay関数に渡す文字列) |
プロセス間のメッセージ送受信
次のサンプルコードで説明していきます。
# 別プロセスとして生成される処理 defmodule Hoge do def say do receive do # メインプロセスからのメッセージを待ち続ける {sender, msg} -> send sender, {:ok, "Hello #{msg}" } end end end # メインプロセスの処理 child_pid = spawn(Hoge, :say, []) send child_pid, {self, "World!"} receive do # 生成したプロセスからのメッセージを待ち続ける {:ok, message} -> IO.puts message end
処理の流れは次のとおりです。
1. プロセスを生成し、メッセージを送信する
send関数に送信先のプロセスIDと送信するメッセージを指定しています。
この例の場合、直前に生成したプロセスにメッセージを送信します。
child_pid = spawn(Hoge, :say, []) send child_pid, {self, "World!"}
self関数は自分のプロセスIDを返却します。send関数は第一引数に送信先のプロセスID、第二引数に送信するメッセージを指定します。
つまりここでは子プロセスに自分のプロセスIDと文字列を送信しています。
2. 子プロセスのメッセージ受信、メッセージ送信
次に子プロセスはreceive関数でメインプロセスからのメッセージを待ち続けます。
メッセージを受信するとメインプロセスに対して{:ok, "hello #{msg}"}のタプルを返信します。
def greet do receive do # メインプロセスからのメッセージを待ち続ける {sender, msg} -> send sender, {:ok, "Hello #{msg}" } # メインプロセスにメッセージを送信する(senderはメインプロセスのプロセスID) end end
3. メインプロセスのメッセージ受信
メインプロセスもreceive関数で子プロセスからのメッセージを待ち続けます。
メッセージを受信すると、標準出力にメッセージを出力します。
receive do # 生成したプロセスからのメッセージを待ち続ける {:ok, message} -> IO.puts message end
receive関数
プロセスはメッセージを受信するとプロセス毎にあるmailboxにメッセージを保存します。
そしてreceive関数はブロックで指定したパターンにマッチしたメッセージを探し、マッチしない場合はマッチするメッセージを受信するまで待ち続けます。
以下の場合、{:ok, messagge}の形式のメッセージを受信するまで待ち続けることになります。
送信側
send sender, {:ok, "Hello #{msg}" }
受信側
receive do {:ok, message} -> IO.puts message end
プロセスの監視
子プロセスの死を検知するには、プロセスを生成するときにプロセス同士を関連づける必要があります。 関連づけるにはspawnではなくsapwn_link関数を使います。
defmodule Hoge do import :timer, only: [ sleep: 1 ] def end_func do sleep 500 exit(:boom) end def run do spawn_link(Hoge, :end_func, []) #spawn_linkでプロセスを生成 receive do msg -> IO.puts "MESSAGE RECEIVED: #{inspect msg}" after 1000 -> IO.puts "Nothing happened" end end end Hoge.run # => ** (EXIT from #PID<0.56.0>), :boom
spawn_linkで子プロセスを生成すると、子プロセスがexit関数を呼び出して処理を終了したときに、メインプロセスはメッセージを受け取ることができます。
モニタリング
プロセスの終了をモニタリングするにはspawn_monitor関数を使います。この関数はプロセスを生成するとプロセスIDとともにプロセスへの参照を返します。
spawn_monitor(Hoge, :end_func, []) # => {#PID<0.62.0>,#Reference<0.0.3.91>}
プロセスが終了した場合、:DOWNメッセージとともにプロセスへの参照を受け取ります。
defmodule Hoge do import :timer, only: [ sleep: 1 ] def end_func do sleep 500 exit(:boom) end def run do spawn_monitor(Hoge, :end_func, []) #spawn_monitorでプロセスを生成 receive do msg -> IO.puts "MESSAGE RECEIVED: #{inspect msg}" after 1000 -> IO.puts "Nothing happened" end end end Hoge.run # => {:DOWN,#Reference<0.0.3.91>,:process,#PID<0.62.0>,:boom}
複数プロセスでフィボナッチ数の計算
最後に複数プロセス処理の複雑な例を見てみます。 Programming Elixirで紹介されているフィボナッチ数列計算の例です。
概要図
だいたいこんな感じです
概要
このプログラムでは計算処理の指示を出すSchedulerとフィボナッチ数の計算処理を行うWorker(Schedulerから生成される)が登場します。複数のフィボナッチ数の計算をするのですが、1プロセスで逐次的に処理をしていては時間がかかるので複数プロセスを使って同時に実行します。
処理の概要は次のとおりです。
順番 | 処理内容 |
---|---|
1 | SchedulerはWorkerのプロセスを複数生成する |
2 | Workerは:readyのメッセージをSchedulerに送信する |
3 | Schedulerは:readyのメッセージを受信すると:fibのメッセージをWorkerに送信して計算を指示する |
4 | Workerは:fibのメッセージを受信するとフィボナッチ数を計算する |
5 | Workerは計算が終わると、:answerのメッセージをSchedulerに送信する |
6 | Schedulerは:answerのメッセージを受け取ると、フィボナッチ数の結果を配列に追加する |
7 | ここまでの処理を与えられたフィボナッチ数計算の対象の数だけ、複数プロセスを使って繰り返す |
ではコードを見ていきましょう
# Schedulerから生成されるWorker defmodule FibSolver do def fib(scheduler) do send scheduler, { :ready, self } receive do { :fib, n, client } -> send client, { :answer, n, fib_calc(n), self } fib(scheduler) { :shutdown } -> exit(:normal) end end defp fib_calc(0), do: 0 defp fib_calc(1), do: 1 defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2) end defmodule Scheduler do def run(num_processes, module, func, fib_array) do (1..num_processes) |> Enum.map(fn(_) -> spawn(module, func, [self]) end) |> schedule_processes(fib_array, []) end defp schedule_processes(processes, queue, results) do receive do {:ready, pid} when length(queue) > 0 -> [ next | tail ] = queue send pid, {:fib, next, self} schedule_processes(processes, tail, results) {:ready, pid} -> send pid, {:shutdown} if length(processes) > 1 do schedule_processes(List.delete(processes, pid), queue, results) else Enum.sort(results, fn {n1,_}, {n2,_} -> n1 <= n2 end) end {:answer, number, result, _pid} -> schedule_processes(processes, queue, [ {number, result} | results ]) end end end fib_array = [37, 37, 37, 37, 37, 37] # ここがエントリポイント Enum.each 1..10, fn num_processes -> {time, result} = :timer.tc(Scheduler, :run, [num_processes, FibSolver, :fib, fib_array]) if num_processes == 1 do IO.puts inspect result IO.puts "\n # time (s)" end :io.format "~2B ~.2f~n", [num_processes, time/1000000.0] end
Elixirの特徴が目立つコードです。特に再帰が多く使われています。
Schedulerの処理
run関数
順番 | 処理内容 |
---|---|
1 | 引数で生成するプロセスの数とプロセス生成するモジュール、関数及び計算するフィボナッチ数の配列を受け取る(この例では10個のプロセスを生成する |
2 | spawnでプロセスを生成し、schedule_processes関数に生成したプロセスのPIDの配列と計算するフィボナッチ数の配列を渡す |
def run(num_processes, module, func, fib_array) do (1..num_processes) |> Enum.map(fn(_) -> spawn(module, func, [self]) end) # プロセス生成処理を10回繰り返す、戻り値のPIDは次の関数に渡す |> schedule_processes(fib_array, []) # パイプでつないでいるので第一引数にPIDの配列が渡される end
schdule_processes関数
順番 | 処理内容 |
---|---|
1 | receive関数でWorkerプロセスからの:readyメッセージを待ち続ける |
2 | :readyメッセージを受け取るとキュー([37,37,37,37,37,37]のこと)に入っている先頭の数を:fibメッセージでWorkerプロセスに送信する |
3 | 残ったキューを引数に再帰処理に入り、再度receive関数でメッセージを待ち続ける(生成したプロセスの数だけ、:readyメッセージを受ける) |
4 | Workerプロセスからの:answerメッセージを受け取った場合、計算したフィボナッチ数を結果用の配列に追加し再帰処理に入る |
5 | :readyメッセージを受け取った時にキューの要素がない場合、Workerに:shutdownメッセージを送信する |
6 | :shutdownメッセージ送信後、WorkerのPIDをプロセスIDの配列(processes)から削除する |
7 | :shutdownメッセージ送信後、プロセスIDの配列の要素数が1の場合最後のプロセスからのメッセージなので、結果の配列をソートして処理を終了する |
やや複雑ですね。。
Worker(FibSolver)の処理
fib関数
順番 | 処理内容 |
---|---|
1 | スケジューラに:readyメッセージを送信しreceive関数でSchedulerからの:fibメッセージを待ち続ける |
2 | :fibメッセージを受信するとフィボナッチ数の計算をしてSchedulerに:answerメッセージを送信する |
3 | ここまでの処理を再帰で繰り返す(計算が終わった後、:readyメッセージを再度スケジューラに送信して:fibメッセージもしくは:shutdownメッセージを待ち続ける |
4 | :shutdownメッセージを受け取ったらexitで終了する |
実行結果
実行してみるとこんな感じでフィボナッチ数と各プロセスの処理時間が出力されます
elixir fib.exs [{37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}, {37, 24157817}] # time (s) 1 8.62 2 4.39 3 4.34 4 4.25 5 4.90 6 4.91 7 4.69 8 5.03 9 4.74 10 4.61
まとめ
Elixirでは非常に簡単にプロセス生成、プロセス間でのメッセージ通信のコードを実装することができます。
今回の例であげると、子プロセスの終了通知をメインプロセスが受け取る処理が分かりやすいと思います。
他の言語であれば、同時に別のプロセス(もしくはスレッド)がメッセージを送信してきた場合などの考慮が必要になります(おそらく、キュー、セマフォロックなどを使ってメッセージ送受信をすることになる)が、Elixirでは不要です。
このあたりはマルチスレッド、マルチプロセスを使ったプログラミングで辛い思いをしてきた方にはかなりありがたいのではないでしょうか?
とは言っても、最後の例のように複雑な処理を一から実装するのはやはり手間がかかりますしコードもどんどん分かりにくくなっていきます。
次回はプロセス周りの実装をサポートするOTPライブラリを使ったプロセス間通信を説明します。